{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "5293efa6-924f-4594-805a-d8fd6b23db32",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Requirement already satisfied: plotly in /opt/conda/lib/python3.11/site-packages (5.24.1)\n",
      "Requirement already satisfied: tenacity>=6.2.0 in /opt/conda/lib/python3.11/site-packages (from plotly) (9.0.0)\n",
      "Requirement already satisfied: packaging in /opt/conda/lib/python3.11/site-packages (from plotly) (24.1)\n"
     ]
    }
   ],
   "source": [
    "#!pip install delta-spark\n",
    "!pip install plotly"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "b8a32498-6b93-4f87-9e42-c57aeee53c34",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql import functions as F\n",
    "from delta import *\n",
    "import plotly.express as px\n",
    "#from IPython.display import IFrame\n",
    "\n",
    "import plotly.io as pio\n",
    "pio.renderers.default = \"iframe\"\n",
    "\n",
    "train_bronze_basepath = \"/data/delta/ts-spark_ch9_bronze_train\"\n",
    "train_silver_basepath = \"/data/delta/ts-spark_ch9_silver_train\"\n",
    "forecast_gold_basepath = \"/data/delta/ts-spark_ch9_gold_forecast\"\n",
    "eval_bronze_basepath = \"/data/delta/ts-spark_ch9_bronze_eval\"\n",
    "eval_silver_basepath = \"/data/delta/ts-spark_ch9_silver_eval\"\n",
    "\n",
    "runids = [1, 2, 3, 4, 5]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "6c5f8aa4-776a-4e48-97f6-bc448f16ab07",
   "metadata": {},
   "outputs": [],
   "source": [
    "builder = SparkSession.builder \\\n",
    "    .master(\"spark://spark-master:7077\") \\\n",
    "    .appName(\"ts-spark_ch9_data-ml-ops_results\") \\\n",
    "    .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\") \\\n",
    "    .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\")\n",
    "\n",
    "spark = configure_spark_with_delta_pip(builder).getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "93338e3e-425e-49eb-968f-501146db1dd8",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create SparkSession\n",
    "#spark = SparkSession.builder.master(\"local[*]\") \\\n",
    "#                    .appName('ts-spark_ch9_data-ml-ops_results') \\\n",
    "#                    .getOrCreate()\n",
    "#\n",
    "#print(f'The PySpark {spark.version} version is running...')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "2ae91013-3e5b-45fc-a0d9-54e3c7627460",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "\n",
       "        <div>\n",
       "            <p><b>SparkContext</b></p>\n",
       "\n",
       "            <p><a href=\"http://133b0a60e091:4040\">Spark UI</a></p>\n",
       "\n",
       "            <dl>\n",
       "              <dt>Version</dt>\n",
       "                <dd><code>v3.5.3</code></dd>\n",
       "              <dt>Master</dt>\n",
       "                <dd><code>spark://spark-master:7077</code></dd>\n",
       "              <dt>AppName</dt>\n",
       "                <dd><code>ts-spark_ch9_data-ml-ops_results</code></dd>\n",
       "            </dl>\n",
       "        </div>\n",
       "        "
      ],
      "text/plain": [
       "<SparkContext master=spark://spark-master:7077 appName=ts-spark_ch9_data-ml-ops_results>"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sc = spark.sparkContext\n",
    "sc"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "fba41689-2c53-41c2-ba01-d652c7ec57c3",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "runid: 1\n",
      "reading delta table /data/delta/ts-spark_ch9_bronze_train_1\n",
      "reading delta table /data/delta/ts-spark_ch9_silver_train_1\n",
      "reading delta table /data/delta/ts-spark_ch9_gold_forecast_1\n",
      "reading delta table /data/delta/ts-spark_ch9_bronze_eval_1\n",
      "reading delta table /data/delta/ts-spark_ch9_silver_eval_1\n",
      "runid: 2\n",
      "reading delta table /data/delta/ts-spark_ch9_bronze_train_2\n",
      "reading delta table /data/delta/ts-spark_ch9_silver_train_2\n",
      "reading delta table /data/delta/ts-spark_ch9_gold_forecast_2\n",
      "reading delta table /data/delta/ts-spark_ch9_bronze_eval_2\n",
      "reading delta table /data/delta/ts-spark_ch9_silver_eval_2\n",
      "runid: 3\n",
      "reading delta table /data/delta/ts-spark_ch9_bronze_train_3\n",
      "reading delta table /data/delta/ts-spark_ch9_silver_train_3\n",
      "reading delta table /data/delta/ts-spark_ch9_gold_forecast_3\n",
      "reading delta table /data/delta/ts-spark_ch9_bronze_eval_3\n",
      "reading delta table /data/delta/ts-spark_ch9_silver_eval_3\n",
      "runid: 4\n",
      "reading delta table /data/delta/ts-spark_ch9_bronze_train_4\n",
      "reading delta table /data/delta/ts-spark_ch9_silver_train_4\n",
      "reading delta table /data/delta/ts-spark_ch9_gold_forecast_4\n",
      "reading delta table /data/delta/ts-spark_ch9_bronze_eval_4\n",
      "reading delta table /data/delta/ts-spark_ch9_silver_eval_4\n",
      "runid: 5\n",
      "reading delta table /data/delta/ts-spark_ch9_bronze_train_5\n",
      "reading delta table /data/delta/ts-spark_ch9_silver_train_5\n",
      "reading delta table /data/delta/ts-spark_ch9_gold_forecast_5\n",
      "reading delta table /data/delta/ts-spark_ch9_bronze_eval_5\n",
      "reading delta table /data/delta/ts-spark_ch9_silver_eval_5\n"
     ]
    }
   ],
   "source": [
    "runid = 1\n",
    "print(f\"runid: {runid}\")\n",
    "print(f\"reading delta table {train_bronze_basepath}_{runid}\")\n",
    "train_bronze_sdf = spark.read.format(\"delta\").load(f\"{train_bronze_basepath}_{runid}\")\n",
    "train_bronze_sdf = train_bronze_sdf.withColumn('runid', F.lit(runid))\n",
    "print(f\"reading delta table {train_silver_basepath}_{runid}\")\n",
    "train_silver_sdf = spark.read.format(\"delta\").load(f\"{train_silver_basepath}_{runid}\")\n",
    "train_silver_sdf = train_silver_sdf.withColumn('runid', F.lit(runid))\n",
    "print(f\"reading delta table {forecast_gold_basepath}_{runid}\")\n",
    "forecast_gold_sdf = spark.read.format(\"delta\").load(f\"{forecast_gold_basepath}_{runid}\")\n",
    "forecast_gold_sdf = forecast_gold_sdf.withColumn('runid', F.lit(runid))\n",
    "print(f\"reading delta table {eval_bronze_basepath}_{runid}\")\n",
    "eval_bronze_sdf = spark.read.format(\"delta\").load(f\"{eval_bronze_basepath}_{runid}\")\n",
    "eval_bronze_sdf = eval_bronze_sdf.withColumn('runid', F.lit(runid))\n",
    "print(f\"reading delta table {eval_silver_basepath}_{runid}\")\n",
    "eval_silver_sdf = spark.read.format(\"delta\").load(f\"{eval_silver_basepath}_{runid}\")\n",
    "eval_silver_sdf = eval_silver_sdf.withColumn('runid', F.lit(runid))\n",
    "for runid in runids:\n",
    "    if runid > 1:\n",
    "        print(f\"runid: {runid}\")\n",
    "        print(f\"reading delta table {train_bronze_basepath}_{runid}\")\n",
    "        _train_bronze_sdf = spark.read.format(\"delta\").load(f\"{train_bronze_basepath}_{runid}\")\n",
    "        _train_bronze_sdf = _train_bronze_sdf.withColumn('runid', F.lit(runid))\n",
    "        train_bronze_sdf = train_bronze_sdf.union(_train_bronze_sdf)\n",
    "        print(f\"reading delta table {train_silver_basepath}_{runid}\")\n",
    "        _train_silver_sdf = spark.read.format(\"delta\").load(f\"{train_silver_basepath}_{runid}\")\n",
    "        _train_silver_sdf = _train_silver_sdf.withColumn('runid', F.lit(runid))\n",
    "        train_silver_sdf = train_silver_sdf.union(_train_silver_sdf)\n",
    "        print(f\"reading delta table {forecast_gold_basepath}_{runid}\")\n",
    "        _forecast_gold_sdf = spark.read.format(\"delta\").load(f\"{forecast_gold_basepath}_{runid}\")\n",
    "        _forecast_gold_sdf = _forecast_gold_sdf.withColumn('runid', F.lit(runid))\n",
    "        forecast_gold_sdf = forecast_gold_sdf.union(_forecast_gold_sdf)\n",
    "        print(f\"reading delta table {eval_bronze_basepath}_{runid}\")\n",
    "        _eval_bronze_sdf = spark.read.format(\"delta\").load(f\"{eval_bronze_basepath}_{runid}\")\n",
    "        _eval_bronze_sdf = _eval_bronze_sdf.withColumn('runid', F.lit(runid))\n",
    "        eval_bronze_sdf = eval_bronze_sdf.union(_eval_bronze_sdf)\n",
    "        print(f\"reading delta table {eval_silver_basepath}_{runid}\")\n",
    "        _eval_silver_sdf = spark.read.format(\"delta\").load(f\"{eval_silver_basepath}_{runid}\")\n",
    "        _eval_silver_sdf = _eval_silver_sdf.withColumn('runid', F.lit(runid))\n",
    "        eval_silver_sdf = eval_silver_sdf.union(_eval_silver_sdf)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "a1c554e4-41de-402d-a9bb-b4fd4000cd65",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- date: date (nullable = true)\n",
      " |-- daily_min_temperature: string (nullable = true)\n",
      " |-- _c2: string (nullable = true)\n",
      " |-- runid: integer (nullable = false)\n",
      "\n",
      "+----------+---------------------+----+-----+\n",
      "|      date|daily_min_temperature| _c2|runid|\n",
      "+----------+---------------------+----+-----+\n",
      "|1981-01-01|                 20.7|NULL|    1|\n",
      "|1981-01-02|                 17.9|NULL|    1|\n",
      "|1981-01-03|                 18.8|NULL|    1|\n",
      "|1981-01-04|                 14.6|NULL|    1|\n",
      "|1981-01-05|                 15.8|NULL|    1|\n",
      "|1981-01-06|                 15.8|NULL|    1|\n",
      "|1981-01-07|                 15.8|NULL|    1|\n",
      "|1981-01-08|                 17.4|NULL|    1|\n",
      "|1981-01-09|                 21.8|NULL|    1|\n",
      "|1981-01-10|                 20.0|NULL|    1|\n",
      "+----------+---------------------+----+-----+\n",
      "only showing top 10 rows\n",
      "\n",
      "count: 9125\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<iframe\n",
       "    scrolling=\"no\"\n",
       "    width=\"100%\"\n",
       "    height=\"545px\"\n",
       "    src=\"iframe_figures/figure_7.html\"\n",
       "    frameborder=\"0\"\n",
       "    allowfullscreen\n",
       "></iframe>\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "train_bronze_sdf.printSchema()\n",
    "train_bronze_sdf.show(10)\n",
    "print(f\"count: {train_bronze_sdf.count()}\")\n",
    "\n",
    "train_bronze_sdf = train_bronze_sdf.withColumn('daily_min_temperature', train_bronze_sdf['daily_min_temperature'].cast(\"float\").alias('daily_min_temperature'))\n",
    "\n",
    "fig = px.scatter(train_bronze_sdf, x='date', y='daily_min_temperature')\n",
    "fig.write_html('train_bronze_sdf.html', auto_open=True)\n",
    "#IFrame(src='train_bronze_sdf.html', width=900, height=600)\n",
    "fig.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "9a66c4ee-c20c-4c2a-a704-cd0640eb6097",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- ds: date (nullable = true)\n",
      " |-- y: double (nullable = true)\n",
      " |-- runid: integer (nullable = false)\n",
      "\n",
      "+----------+----+-----+\n",
      "|        ds|   y|runid|\n",
      "+----------+----+-----+\n",
      "|1981-01-01|20.7|    1|\n",
      "|1981-01-02|17.9|    1|\n",
      "|1981-01-03|18.8|    1|\n",
      "|1981-01-04|14.6|    1|\n",
      "|1981-01-05|15.8|    1|\n",
      "|1981-01-06|15.8|    1|\n",
      "|1981-01-07|15.8|    1|\n",
      "|1981-01-08|17.4|    1|\n",
      "|1981-01-09|21.8|    1|\n",
      "|1981-01-10|20.0|    1|\n",
      "+----------+----+-----+\n",
      "only showing top 10 rows\n",
      "\n",
      "count: 9117\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<iframe\n",
       "    scrolling=\"no\"\n",
       "    width=\"100%\"\n",
       "    height=\"545px\"\n",
       "    src=\"iframe_figures/figure_8.html\"\n",
       "    frameborder=\"0\"\n",
       "    allowfullscreen\n",
       "></iframe>\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "train_silver_sdf.printSchema()\n",
    "train_silver_sdf.show(10)\n",
    "print(f\"count: {train_silver_sdf.count()}\")\n",
    "\n",
    "fig = px.scatter(train_silver_sdf, x='ds', y='y')\n",
    "fig.write_html('train_silver_sdf.html', auto_open=True)\n",
    "#IFrame(src='train_silver_sdf.html', width=900, height=600)\n",
    "fig.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "00a580fb-ff41-41ac-a64d-52236d9d5040",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- ds: timestamp (nullable = true)\n",
      " |-- yhat: double (nullable = true)\n",
      " |-- yhat_lower: double (nullable = true)\n",
      " |-- yhat_upper: double (nullable = true)\n",
      " |-- runid: integer (nullable = false)\n",
      "\n",
      "+-------------------+------------------+------------------+------------------+-----+\n",
      "|                 ds|              yhat|        yhat_lower|        yhat_upper|runid|\n",
      "+-------------------+------------------+------------------+------------------+-----+\n",
      "|1986-07-02 00:00:00|7.0502054294413465| 3.307358146371069|10.465938828776352|    1|\n",
      "|1986-07-03 00:00:00| 6.587560735891971|  2.97254994764213|10.167342777711973|    1|\n",
      "|1986-07-04 00:00:00| 6.824346417179263| 3.520005151593687|10.381008117982446|    1|\n",
      "|1986-07-05 00:00:00| 6.665679404797449|3.2559132714407397|10.206536118631064|    1|\n",
      "|1986-07-06 00:00:00| 6.499936967527267|3.2156487525403317| 10.40039431933827|    1|\n",
      "|1986-07-07 00:00:00| 6.975215084511728|3.3075225241951824|10.432759147392009|    1|\n",
      "|1986-07-08 00:00:00| 7.041646186214637|  3.32037206791148|10.366183482277654|    1|\n",
      "|1986-07-09 00:00:00| 6.824035282255436|  3.31362866807389|10.488883775432196|    1|\n",
      "|1986-07-10 00:00:00| 6.325744990411982|2.5936338349207335| 9.760439583971223|    1|\n",
      "|1986-07-11 00:00:00| 6.532823609329169| 3.128594581574371|10.218518394731852|    1|\n",
      "+-------------------+------------------+------------------+------------------+-----+\n",
      "only showing top 10 rows\n",
      "\n",
      "count: 1825\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<iframe\n",
       "    scrolling=\"no\"\n",
       "    width=\"100%\"\n",
       "    height=\"545px\"\n",
       "    src=\"iframe_figures/figure_9.html\"\n",
       "    frameborder=\"0\"\n",
       "    allowfullscreen\n",
       "></iframe>\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "forecast_gold_sdf.printSchema()\n",
    "forecast_gold_sdf.show(10)\n",
    "print(f\"count: {forecast_gold_sdf.count()}\")\n",
    "\n",
    "fig = px.scatter(forecast_gold_sdf, x='ds', y='yhat', color='runid')\n",
    "fig.write_html('forecast_gold_sdf.html', auto_open=True)\n",
    "#IFrame(src='forecast_gold_sdf.html', width=900, height=600)\n",
    "fig.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "091a2e91-c2d3-440d-837f-a8b8b9b4b434",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- date: date (nullable = true)\n",
      " |-- daily_min_temperature: string (nullable = true)\n",
      " |-- _c2: string (nullable = true)\n",
      " |-- runid: integer (nullable = false)\n",
      "\n",
      "+----------+---------------------+----+-----+\n",
      "|      date|daily_min_temperature| _c2|runid|\n",
      "+----------+---------------------+----+-----+\n",
      "|1981-01-01|                 20.7|NULL|    1|\n",
      "|1981-01-02|                 17.9|NULL|    1|\n",
      "|1981-01-03|                 18.8|NULL|    1|\n",
      "|1981-01-04|                 14.6|NULL|    1|\n",
      "|1981-01-05|                 15.8|NULL|    1|\n",
      "|1981-01-06|                 15.8|NULL|    1|\n",
      "|1981-01-07|                 15.8|NULL|    1|\n",
      "|1981-01-08|                 17.4|NULL|    1|\n",
      "|1981-01-09|                 21.8|NULL|    1|\n",
      "|1981-01-10|                 20.0|NULL|    1|\n",
      "+----------+---------------------+----+-----+\n",
      "only showing top 10 rows\n",
      "\n",
      "count: 10950\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<iframe\n",
       "    scrolling=\"no\"\n",
       "    width=\"100%\"\n",
       "    height=\"545px\"\n",
       "    src=\"iframe_figures/figure_10.html\"\n",
       "    frameborder=\"0\"\n",
       "    allowfullscreen\n",
       "></iframe>\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "eval_bronze_sdf.printSchema()\n",
    "eval_bronze_sdf.show(10)\n",
    "print(f\"count: {eval_bronze_sdf.count()}\")\n",
    "\n",
    "eval_bronze_sdf = eval_bronze_sdf.withColumn('daily_min_temperature', eval_bronze_sdf['daily_min_temperature'].cast(\"float\").alias('daily_min_temperature'))\n",
    "\n",
    "fig = px.scatter(eval_bronze_sdf, x='date', y='daily_min_temperature')\n",
    "fig.write_html('eval_bronze_sdf.html', auto_open=True)\n",
    "#IFrame(src='eval_bronze_sdf.html', width=900, height=600)\n",
    "fig.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "219b859d-2f53-43b3-8055-52361fbcf6c4",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- ds: date (nullable = true)\n",
      " |-- y: double (nullable = true)\n",
      " |-- runid: integer (nullable = false)\n",
      "\n",
      "+----------+----+-----+\n",
      "|        ds|   y|runid|\n",
      "+----------+----+-----+\n",
      "|1981-01-01|20.7|    1|\n",
      "|1981-01-02|17.9|    1|\n",
      "|1981-01-03|18.8|    1|\n",
      "|1981-01-04|14.6|    1|\n",
      "|1981-01-05|15.8|    1|\n",
      "|1981-01-06|15.8|    1|\n",
      "|1981-01-07|15.8|    1|\n",
      "|1981-01-08|17.4|    1|\n",
      "|1981-01-09|21.8|    1|\n",
      "|1981-01-10|20.0|    1|\n",
      "+----------+----+-----+\n",
      "only showing top 10 rows\n",
      "\n",
      "count: 10942\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<iframe\n",
       "    scrolling=\"no\"\n",
       "    width=\"100%\"\n",
       "    height=\"545px\"\n",
       "    src=\"iframe_figures/figure_11.html\"\n",
       "    frameborder=\"0\"\n",
       "    allowfullscreen\n",
       "></iframe>\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "eval_silver_sdf.printSchema()\n",
    "eval_silver_sdf.show(10)\n",
    "print(f\"count: {eval_silver_sdf.count()}\")\n",
    "\n",
    "fig = px.scatter(eval_silver_sdf, x='ds', y='y')\n",
    "fig.write_html('eval_silver_sdf.html', auto_open=True)\n",
    "#IFrame(src='eval_silver_sdf.html', width=900, height=600)\n",
    "fig.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "1cd6d927-d0d8-4944-8b9a-dc383e0d7692",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- date: date (nullable = true)\n",
      " |-- runid: integer (nullable = false)\n",
      " |-- ds: timestamp (nullable = true)\n",
      " |-- yhat: double (nullable = true)\n",
      " |-- yhat_lower: double (nullable = true)\n",
      " |-- yhat_upper: double (nullable = true)\n",
      " |-- y: double (nullable = true)\n",
      "\n",
      "+----------+-----+-------------------+------------------+------------------+------------------+----+\n",
      "|      date|runid|                 ds|              yhat|        yhat_lower|        yhat_upper|   y|\n",
      "+----------+-----+-------------------+------------------+------------------+------------------+----+\n",
      "|1986-01-01|    1|1986-01-01 00:00:00|15.304088658556083|11.762421919032162|18.681151694847234|12.9|\n",
      "|1986-01-02|    1|1986-01-02 00:00:00|14.912985035062876| 11.35717035259453|18.553112375066544|13.8|\n",
      "|1986-01-03|    1|1986-01-03 00:00:00|15.218134049807485| 11.83353839921466|18.931096197141773|10.6|\n",
      "|1986-01-04|    1|1986-01-04 00:00:00|15.124372509163033|11.502590066069416|18.716316207313586|12.6|\n",
      "|1986-01-05|    1|1986-01-05 00:00:00|15.019784617956768|11.548214646627523|18.406686807350148|13.7|\n",
      "|1986-01-06|    1|1986-01-06 00:00:00|15.552164885957072| 11.97599398677896|19.162043333860073|12.6|\n",
      "|1986-01-07|    1|1986-01-07 00:00:00|15.671340986960892|12.260233922548402| 19.18265150786714|13.1|\n",
      "|1986-01-08|    1|1986-01-08 00:00:00|15.501815500249823|11.905513084488348|18.973438545037492|15.4|\n",
      "|1986-01-09|    1|1986-01-09 00:00:00|15.046656942798247| 11.61701723651068| 18.81085692998812|11.9|\n",
      "|1986-01-10|    1|1986-01-10 00:00:00|15.291634114041461|11.702322681460581| 18.98008334587187|13.8|\n",
      "+----------+-----+-------------------+------------------+------------------+------------------+----+\n",
      "only showing top 10 rows\n",
      "\n",
      "count: 1824\n"
     ]
    }
   ],
   "source": [
    "forecast_gold_sdf_join = forecast_gold_sdf.withColumn('date', forecast_gold_sdf['ds'].cast('date'))\n",
    "eval_silver_sdf_forjoin = eval_silver_sdf.withColumnRenamed('ds', 'date')\n",
    "\n",
    "forecast_gold_sdf_join = forecast_gold_sdf_join.join(eval_silver_sdf_forjoin, ['date', 'runid'], \"inner\")\n",
    "forecast_gold_sdf_join = forecast_gold_sdf_join.sort(['runid', 'date'])\n",
    "\n",
    "forecast_gold_sdf_join.printSchema()\n",
    "forecast_gold_sdf_join.show(10)\n",
    "print(f\"count: {forecast_gold_sdf_join.count()}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "b02d2082-7b13-4b74-990a-bfe445420af9",
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<iframe\n",
       "    scrolling=\"no\"\n",
       "    width=\"100%\"\n",
       "    height=\"545px\"\n",
       "    src=\"iframe_figures/figure_13.html\"\n",
       "    frameborder=\"0\"\n",
       "    allowfullscreen\n",
       "></iframe>\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "fig1 = px.line(forecast_gold_sdf_join, x='date', y=['yhat_lower', 'yhat', 'yhat_upper'], color_discrete_sequence = ['rgba(10,10,10,0.2)'])\n",
    "fig2 = px.scatter(forecast_gold_sdf_join, x='date', y=['y'], color='runid') \\\n",
    "        .add_trace(fig1.data[0]) \\\n",
    "        .add_trace(fig1.data[1]) \\\n",
    "        .add_trace(fig1.data[2])\n",
    "fig2.update_layout(showlegend=False)\n",
    "fig2.write_html('forecast_gold_sdf_join.html', auto_open=True)\n",
    "#IFrame(src='forecast_gold_sdf_join.html', width=900, height=600)\n",
    "fig2.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "17c5ee49-8ab1-4aef-b9c7-4cdfb7717e90",
   "metadata": {},
   "outputs": [],
   "source": [
    "#spark.sparkContext.stop()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
